package cn.zhangfusheng.elasticsearch.template;

import cn.zhangfusheng.elasticsearch.exception.GlobalSystemException;
import cn.zhangfusheng.elasticsearch.model.es.PutPipeline;
import cn.zhangfusheng.elasticsearch.thread.ThreadLocalDetail;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

/**
 * ingest api
 * @author fusheng.zhang
 * @date 2022-04-23 11:10:54
 */
public interface TemplageIngestApi extends Template {

    Logger log = LoggerFactory.getLogger(TemplageIngestApi.class);

    /**
     * 获取管道
     * @param id
     * @return
     */
    default List<PipelineConfiguration> getPipeline(String... id) {
        GetPipelineRequest getPipelineRequest = new GetPipelineRequest(id);
        log.debug("getPipeLine:{}", Arrays.stream(id).toArray());
        try {
            GetPipelineResponse response = restHighLevelClient().ingest()
                    .getPipeline(getPipelineRequest, ThreadLocalDetail.requestOptions());
            return response.pipelines();
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 创建管道
     * @param putPipeline
     */
    default void putPipeline(PutPipeline putPipeline) {
        List<PipelineConfiguration> pipeline = this.getPipeline(putPipeline.getId());
        if (CollectionUtils.isEmpty(pipeline)) {
            PutPipelineRequest putPipelineRequest = new PutPipelineRequest(
                    putPipeline.getId(), new BytesArray(putPipeline.source().getBytes(StandardCharsets.UTF_8)), XContentType.JSON);
            log.debug("createPipeline:{}", putPipelineRequest);
            try {
                AcknowledgedResponse acknowledgedResponse =
                        restHighLevelClient().ingest().putPipeline(putPipelineRequest, ThreadLocalDetail.requestOptions());
                if (!acknowledgedResponse.isAcknowledged()) throw new GlobalSystemException("创建管道失败");
            } catch (IOException e) {
                throw new GlobalSystemException(e);
            }
        }
    }

    /**
     * 删除管道
     * @param id
     */
    default void delPipeline(String id) {
        DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest(id);
        try {
            AcknowledgedResponse acknowledgedResponse =
                    restHighLevelClient().ingest().deletePipeline(deletePipelineRequest, ThreadLocalDetail.requestOptions());
            if (!acknowledgedResponse.isAcknowledged()) throw new GlobalSystemException("删除管道失败");
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }
}
